package com.pr.springbootthmeleaf;

import org.junit.Test;

import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action1;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AsyncProgram {

    public static void main(String[] args) {
        //两个线程的线程池
        ExecutorService executor = Executors.newFixedThreadPool(2);
        //小红买酒任务，这里的future2代表的是小红未来发生的操作，返回小红买东西这个操作的结果
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("爸：小红你去买瓶酒！");
            try {
                System.out.println("小红出去买酒了，女孩子跑的比较慢，估计5s后才会回来...");
                Thread.sleep(5000);
                return "我买回来了！";
            } catch (InterruptedException e) {
                System.err.println("小红路上遭遇了不测");
                return "来世再见！";
            }
        }, executor);

        //小明买烟任务，这里的future1代表的是小明未来买东西会发生的事，返回值是小明买东西的结果
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("爸：小明你去买包烟！");
            try {
                System.out.println("小明出去买烟了，可能要3s后回来...");
                Thread.sleep(3000);
                return "我买回来了!";
            } catch (InterruptedException e) {
                System.out.println("小明路上遭遇了不测！");
                return "这是我托人带来的口信，我已经不在了。";
            }
        }, executor);

        //获取小红买酒结果，从小红的操作中获取结果，把结果打印
        future2.thenAccept((e) -> {
            System.out.println("小红说：" + e);
        });
        //获取小明买烟的结果
        future1.thenAccept((e) -> {
            System.out.println("小明说：" + e);
        });

        System.out.println("爸：loading......");
        System.out.println("爸:我觉得无聊甚至去了趟厕所。");
        System.out.println("爸：loading......");
    }

    @Test
    public void observableTest() {
        Observable<String> observable = Observable.just("one", "two", "three");

        Observable<String> stringObservable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("hello,reactive world");
                subscriber.onCompleted();
            }
        });

        Observable<String> objectObservable = Observable.create(subscriber -> {
            subscriber.onNext("hello,reactive world");
            subscriber.onCompleted();
        });

        Subscription subscribe = observable.subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println("onError");
            }

            @Override
            public void onNext(String s) {
                System.out.println("Item is:" + s);
            }
        });

        Subscriber<String> subscriber2 = new Subscriber<>() {
            @Override
            public void onCompleted() {
                System.out.println("Done");
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println(throwable);
            }

            @Override
            public void onNext(String s) {
                System.out.println(s);
            }
        };

        stringObservable.subscribe(subscriber2);
        objectObservable.subscribe(subscriber2);

        Observable.create(subscriber -> {
            subscriber.onNext("Hello,Reactive World");
            subscriber.onCompleted();
        }).subscribe(
                System.out::println,
                System.err::println,
                () -> System.out.println("Done")
        );
    }
}
